原文地址:http://notes.stephenholiday.com/Kafka.pdf
太长不看:
相对于JMS等其他的消息系统,Kafka舍弃了很多功能,以达到性能上的提升。
论文讲述了Kafka设计上的取舍,以及提升性能的很多点。
摘要
日志处理已经成为消费互联网公司数据管道的重要组成部分。
我们将开始介绍Kafka,这是一个我们开发出用于收集和传递大批量的日志数据,并且具有低延迟的分布式消息传递系统。
Kafka融合了现有的日志聚合器和消息传递系统的思想,适用于消费离线和在线消息。
我们在Kafka中做了不少非常规但又实用的设计,使我们的系统具有高效和扩展性。
我们的实验结果表明,与两种流行的消息传递系统相比,Kafka具有优越的性能。
我们在生产中使用Kafka已经有一段时间了,它每天要处理数百GB的新数据。
1. 介绍
任何一家大型互联网公司都会产生大量的 “日志 “数据。
这些数据通常包括:
用户活动事件,包括登录、页面浏览、点击、”喜欢”、分享、评论和搜索查询
运营指标,如服务调用堆栈、调用延迟、错误,以及系统指标,如CPU、内存、网络或磁盘利用率等。
长期以来,日志数据一直是分析的一个组成部分,用于跟踪用户参与度、系统利用率和其他指标。
然而最近互联网应用的趋势使得活动数据成为产品数据管道的一部分,直接用于网站功能中。
这些用途包括:
- 搜索相关性
- 活动流中的受欢迎或共同出现的项目产生的推荐
- 广告定位和报告
- 防止滥用行为的安全应用,如垃圾邮件或未经授权的数据爬取
- 新闻联播功能,将用户的状态更新或行动汇总起来,供其 “朋友 “阅读。
这种生产、实时使用的日志数据给数据系统带来了新的挑战,因为它的数据量比 “真实 “的数据要大好几个数量级。
例如,搜索、推荐和广告往往需要计算颗粒化的点击率,这不仅会产生每一个用户点击的日志记录,还会产生每个页面上几十个未点击的项目的日志记录。
中国移动每天收集5-8TB的电话通话记录,Facebook每天则收集了近6TB的各种用户活动事件。
许多早期处理这类数据的系统都是依靠从生产服务器上实际收集日志文件进行分析。
近年来,一些专门的分布式日志聚合器已经发布,包括Facebook的Scribe[6]、Yahoo的Data Highway和Cloudera的Flume。
这些系统主要是为了收集日志数据,并将日志数据加载到数据仓库或Hadoop[8]中进行离线消费。
在LinkedIn(一家社交网站),我们发现除了传统的离线分析之外,我们还需要以不超过几秒的延迟支持上述大部分实时应用。
我们构建了一种新型的日志处理的消息传递系统,称为Kafka,它结合了传统日志聚合器和消息传递系统的优点。
一方面,Kafka具有分布式和可扩展性,并提供了高吞吐量。
另一方面,Kafka提供了类似于消息传递系统的API,允许应用程序实时消耗日志事件。
Kafka已经开源,并在LinkedIn的生产中成功使用了6个多月。
它极大地简化了我们的基础设施,因为我们可以利用一个单一的软件来在线和离线消费各种类型的日志数据。
本文的其余部分安排如下。
- 在第2节中,我们重新审视了传统的消息传递系统和日志聚合器。
- 在第3节中,我们描述了Kafka的架构及其关键设计原则。
- 在第4节中,我们描述了我们在LinkedIn上部署的Kafka
- 在第5节中描述了Kafka的性能结果。
- 我们在第6节中讨论了未来的工作
- 在第6节中做了总结。
2. 相关工作
传统的企业消息系统已经存在了很长时间,通常在处理异步数据流的事件总线中起着至关重要的作用。
然而,有几个原因导致它们往往不能很好地适应日志处理。
首先,企业级系统提供的特性与日志处理该有的不匹配。那些系统往往侧重于提供丰富的交付保证。
例如,IBM Websphere MQ具有事务式支持,允许一个应用程序将消息以原子方式插入到多个队列中。
而JMS规范允许每个消息在消费后被确认消费,消费顺序可能是无序的。(没看懂,对JMS不了解,脑补了下,乱序消费并幂等的意思?)
这样的交付保证对于收集日志数据来说往往是矫枉过正的。偶尔丢失几个页面浏览事件当然不是世界末日。
那些不需要的功能往往会增加这些系统的API和底层实现的复杂性。
其次,相比较首要设计约束功能,许多系统并不是那样强烈地关注吞吐量。例如,JMS没有API允许生产者明确地将多个消息批量化为一个请求。这意味着每个消息都需要进行一次完整的TCP/IP往返,这对于我们领域的吞吐量要求是不可行的。
第三,那些系统在分布式支持方面比较弱。没有简单的方法可以在多台机器上对消息进行分区和存储。
最后,许多消息系统假设消息会被近似实时消费掉,未被消费的消息量总是相当小。
导致如果出现消息累积,它们的性能就会大大降低。比如当数据仓库等离线消耗者对消息系统做周期性的大负载消费,而不是连续消费数据时。
在过去几年里,已经建立了一些专门的日志聚合器。
比如Facebook使用了一个叫Scribe的系统,每个前端机器可以通过网络向一组Scribe机器发送日志数据。
每台Scribe机器聚合日志条目,并定期将其转储到HDFS或NFS设备上。
雅虎的数据高速公路项目也有类似的数据传递方式,一组机器聚合来自客户端的事件,按分钟保存为文件,然后将
其添加到HDFS。
Flume是Cloudera开发的一个比较新的日志聚合器。它支持可扩展的 “管道 “和 “数据下沉”,使流式日志数据的传
输非常灵活。它也有更多的集成分布式支持。
但是,这些系统大多是为离线消耗日志数据而构建的,往往会将实现细节(如 “按分钟保存的文件”)不必要地暴露给消费者。
此外,他们中的大多数都采用了 “推送 “模式,即Broker将数据转发给消费者。
在LinkedIn,我们发现 “拉动 “模式更适合我们的应用,因为每个消费者都能以自己能承受的最大速率检索到消
息,避免被推送的消息淹没在比自己能承受的速度更快的消息中。
拉动模式还可以让消费者很容易回传,我们在
3.2节末尾讨论了这个好处的细节。
最近,雅虎研究公司开发了一种新的分布式pub/sub系统,名为HedWig。HedWig具有高度的可扩展性和可
用性,并提供了强大的持久性保证。不过,它主要是用于存储资料库(data store)的提交日志。
3. Kafka架构和设计原则
由于现有的各种消息系统的局限性,我们开发了一种新的基于消息传递的日志聚合器Kafka。
我们首先介绍一下Kafka中的基本概念。
一个主题定义一个特定类型的消息流。
一个生产者可以向一个主题发布消息。然后,发布的消息被存储在一组称为Broker的服务器上。
一个消费者可以从Broker那里订阅一个或多个主题,并通过从Broker那里提取数据来消费订阅的消息。
从概念上讲,消息传递的定义是比较简单的。同样的,我们试图使Kafka API也一样简单。为了证明这一点,我们
不展示具体的API,而是介绍一些示例代码来展示API的使用方法。
下面给出了生产者的示例代码。一个消息被定义为只包含一个字节的内容。用户可以选择自己喜欢的序列化方
法对消息进行编码。为了提高效率,生产者可以在一次发布请求中发送一组消息。
Sample producer code:
1 | producer = new Producer(...); |
要订阅一个主题,消费者首先要为该主题创建一个或多个消息流(理解为分区)。
发布到该主题的消息将被平均分配到这些子消息流(分区)中。
关于Kafka如何分配消息的细节将在后面的3.2节中描述。
每个消息流在持续产生的消息流上提供了一个迭代器接口。
消费者对消息流中的每个消息进行迭代,并处理消息的内容。
与传统的迭代器不同,消息流迭代器永远不会终止。
如果当前没有更多的消息要消费,迭代器就会阻塞,直到新的消息被发布到主题上。
我们既支持点对点的传递模式,即多个消费者共同消费一个主题中所有消息的单一副本,也支持多个消费者各自检
索一个主题的副本的发布/订阅模式。
Sample consumer code:
1 | streams[] = Consumer.createMessageStreams(“topic1”, 1) for (message : streams[0]) { |
Kafka的整体架构如图1所示。
由于Kafka是分布式的,所以一个Kafka集群通常由多个Broker组成。
为了平衡负载,一个主题被划分成多个分区,每个Broker存储一个或多个分区。
多个生产者和消费者可以同时发布和消费消息。
在第3.1节中,我们将描述Broker上的单个分区的布局,以及我们选择的一些设计选择,以使访问分区的效率更高。
在第3.2节中,我们将描述生产者和消费者在分布式设置中如何与多个Broker交互。
在第3.3节中,我们将讨论Kafka的交付保证(delivery guarantees)。
3.1 单分区的性能
我们在Kafka中做了一些设计的决策,让系统更有效率。
1. 简单的存储方式:Kafka有一个非常简单的存储布局。
一个主题的每个分区对应一个逻辑日志。
在物理上,一个日志被实现为一组大小大致相同的段文件(例如,1GB)。
每当生产者向分区发布消息时,Broker只需将消息附加到最后一个段文件中。
为了更好的性能,我们只有在发布了一定数量的消息后,或者在发布了一定时间后,才会将段文件刷新到磁盘上。
一个消息只有在刷新后才会暴露在消费者面前。
与典型的消息传递系统不同,Kafka中存储的消息没有明确的消息ID。
相反,每条消息都是通过其在日志中的逻辑偏移来寻址。
这避免了维护用于辅助查询的索引结构的开销,这些索引结构将消息id映射到实际的消息位置。
注意,我们提到的消息id是递增的,但不是连续的。为了计算下一条消息的id,我们必须将当前消息的长度加到它的id上。
从现在开始,我们将交替使用消息id和偏移量。
消费者总是顺序消费来自特定分区的消息。
如果消费者确认某个特定的消息偏移,就意味着消费者已经接收到了该分区中该偏移之前的所有消息。
在实际的运行中,消费者向Broker发出异步拉取消息请求,以便有一个缓冲区的数据准备好供应用程序消费。
每个拉取消息请求都包含消费开始的消息的偏移量和可接受的字节数。
每个Broker在内存中保存一个排序的偏移量列表,包括每个段文件中第一个消息的偏移量。Broker
通过搜索偏移量列表来定位所请求的报文所在的段文件,并将数据发回给消费者。
当消费者收到一条消息后,它计算出下一条要消费的消息的偏移量,并在下一次拉取请求中使用它。
Kafka日志和内存中索引的布局如图2所示。每个框显示了一条消息的偏移量。
2. 高效的传输: 我们在Kafka中传输数据的时候非常谨慎。
早前,我们已经表明,生产者可以在一次发送请求中提交一组消息。
虽然消费者API每次迭代一条消息,但在实际运行中,每一个消费者的拉动请求也会检索到多个消
息。一次传输通常是几百个K字节的大小。
我们做出的另一个非常规的选择是避免在Kafka层面缓存消息在内存中。
相反,我们依赖底层文件系统的页面缓存。
这样做的主要好处是避免了双重缓冲,消息就只会缓存在页面缓存中。
这样做还有一个额外的好处,那就是即使在代理进程重启的时候,也能保留热缓存(warm cache)。
由于Kafka根本不在进程中缓存消息,所以它在垃圾回收内存方面的开销非常小,这使得在基于VM
的语言中高效实现是可行的。
最后,由于生产者和消费者都是按顺序访问段文件,而消费者往往会比生产者晚一点,所以正常的
操作系统缓存启发式缓存是非常有效的(缓存直写和预读)。
我们发现,生产者和消费者的性能都与数据大小呈线性关系,最大的数据量可以达到很多T字节。(没看懂)
此外,我们还对消费者的网络访问进行了优化。
Kafka是一个多消费者系统,一条消息可能被不同的消费者应用多次消耗。
从本地文件向远程socket发送字节的典型方法包括以下步骤。
- 从存储介质中读取数据到操作系统中的页面缓存
- 将页面缓存中的数据复制到应用缓冲区
- 将应用缓冲区复制到另一个内核缓冲区
- 将内核缓冲区发送到Socket。
其中包括4个数据复制和2个系统调用。
在Linux和其他Unix操作系统上,存在一个sendfile API,可以直接将字节从文件通道传输到socket
通道。这通常可以避免步骤(2)和(3)中介绍的2个复制和1个系统调用。
Kafka利用sendfile API来有效地将日志段文件中的字节从代理服务器向消费者传递。
3. 无状态的Broker: 与大多数其他消息系统不同,在Kafka中,每个消费者消费了多少消息的信
息不是由Broker维护,而是由消费者自己维护。这样的设计减少了很多的复杂性,也减少了Broker
的开销。
但是,这使得删除消息变得很棘手,因为Broker不知道是否所有的用户都消费了这个消息。
Kafka通过使用简单的基于时间的SLA保留策略解决了这个问题。
如果一条消息在代理中保留的时间超过一定的时间,通常是7天,则会自动删除。
这个方案在实际应用中效果不错。大部分消费者包括离线的消费者,都是按日、按小时或实时完成
消费。由于Kafka的性能不会随着数据量的增大而降低,所以这种长时间保留的方案是可行的。
这种设计有一个重要的副作用。
一个消费者可以故意倒退到一个旧的偏移量,重新消费数据。
这违反了队列的通用规定,但事实证明,这对很多消费者来说是一个必不可少的功能。
例如,当消费者中的应用逻辑出现错误时,应用可以在错误修复后回放某些消息。这对我们的数据仓库或Hadoop系统中的ETL数据加载特别重要。
再比如,被消费的数据可能只是周期性地被刷新到一个持久化存储(例如,全文索引器)。
如果消费者崩溃,未冲洗的数据就会丢失。在这种情况下,消费者可以检查未冲洗的消息的最小偏移量,并在重启时从该偏移量中重新消费。
我们注意到,相比于推送模型,在拉动模型中支持消费者重新消费要容易得多。
3.2 分布式协调处理
现在我们来解释一下生产者和消费者在分布式环境中的执行方式。
每个生产者可以向一个随机的或由分区key和分区函数语义决定的分区发布消息。我们将重点讨论消
费者是如何与Broker互动的。
Kafka有消费者组的概念。
每个消费组由一个或多个消费者组成,共同消费一组被订阅的主题,也就是说,每条消息只传递给
消费组内的一个消费者。
不同的消费者组各自独立消费全套订阅的消息,不需要跨消费者组的协调机制。
同一组内的消费者可以在不同的进程或不同的机器上。
我们的目标是在不引入过多的协调开销的情况下,将存储在Broker中的消息平均分配给组中的所有
消费者。
我们的第一个决定是将一个主题内的分区作为最小的并行单元。
这意味着,在任何时候一个分区的所有消息都只被每个消费组中的一个消费者消费。
假设我们允许多个消费者同时消费一个分区,那么他们就必须协调谁消费什么消息,这就需要加锁和维护状态,会造成一定的额外开销。
相反,在我们的设计中,消费进程只需要在消费者重新平衡负载时进行协调,正常来说这种情况不经常发生。
为了使负载真正平衡,我们需要一个主题中的分区比每个消费组中的消费者多很多。
我们可以通过对一个主题进行更多的分区来达到这个目的。
我们做的第二个决定是不设立中心化的”主控 “节点,而是让消费者以去中心化的方式相互协调。
增加一个主节点会使系统变得复杂化,因为我们不得不进一步担心主节点故障。
为了方便协调,我们采用了一个高度可用的共识服务Zookeeper。
Zookeeper有一个非常简单的、类似于文件系统的API。
人们可以创建一个路径,设置一个路径的值,读取一个路径的值,删除一个路径的值,以及列出一个路径的子路径。
它还可以做一些更有趣的事情。
- 可以在路径上注册一个watcher,当路径的子路径或路径的值发生变化时,可以得到通知
- 可以将路径创建为临时的(相对于持久性的),这意味着如果创建的客户端不在了,路径会被Zookeeper服务器自动删除
- zookeeper将数据复制到多个服务器上,这使得数据的可靠性和可用性很高。
Kafka使用Zookeeper完成以下任务。
检测Broker和消费者的添加和删除
当上述事件发生时,在每个消费者中触发一个再平衡过程
维护消费关系,并跟踪每个分区的消费偏移情况。
具体来说,当每个Broker或消费者启动时,它将其信息存储在Zookeeper中的Broker或消费者注册表中。
Broker注册表包含Broker的主机名和端口,以及存储在其上的主题和分区。
消费者注册表包括消费者所属的消费组,以及它所订阅的主题集合。
每个消费组都与Zookeeper中的一个所有权注册表和一个偏移注册表相关联。
所有权注册表对每个订阅的分区都有一个路径,路径值是当前从这个分区消费的消费者id(我们使用的术语是消费者拥有这个分区)。
偏移注册表为每个订阅的分区存储了该分区中最后一个被消费的消息的偏移量。
Broker注册表、消费者注册表和所有权注册表在 Zookeeper 中创建的路径都是临时的。
偏移注册表中创建的路径是持久的。
如果一个Broker服务器发生故障,其上的所有分区都会自动从Broker注册表中删除。
消费者的故障会导致其在消费者注册表中的记录和所有权注册表中的所有分区记录丢失。
每个消费者都会在Broker注册表和消费者注册表上注册一个Zookeeper的Watcher,每当Broker集合或消费者组
发生变化时,都会收到通知。
在消费者的初始启动过程中,或者当消费者通过Watcher收到关于Broker/消费者变更的通知时,消费者会启动一
个重新平衡过程,以确定它应该消费的新分区。
在算法1中描述了这个过程。
通过从Zookeeper读取Broker和消费者注册表,消费者首先计算每个订阅主题T的可用分区集合(PT)和订阅T的消费者集合(CT)。
对于消费者选择的每个分区,它在所有权注册表中写入自己作为该分区的新所有者。
最后,消费者开始一个线程从拥有的分区中拉出数据,偏移量从存储在偏移注册表中的记录值开始。
当消息从分区中拉出时,消费者会定期更新偏移注册表中的最新消耗的偏移量。
当一个消费组内有多个消费者时,每个消费者都会收到Broker或消费者变更的通知。
但是,通知到达每个消费者的时间上略有不同。
因此,有可能是一个消费者试图夺取仍由另一个消费者拥有的分区的所有权。
当这种情况发生时,第一个消费者只需释放其当前拥有的所有分区,等待一段时间,然后重新尝试重新平衡。
在实践中,重新平衡过程通常只需重试几次就会稳定下来。
当创建一个新的消费者组时,偏移注册表中没有可用的偏移量。
在这种情况下,消费者将使用我们在Broker上提供的API,从每个订阅分区上可用的最小或最大的偏移量开始(取
决于配置)。
3.3 传递保证
一般来说,Kafka只保证至少一次交付语义。
确切一次交付语义通常需要两阶段提交,对于我们的应用来说并不是必须的。
大多数情况下,一个消息会准确地传递给每个消费组一次。
但是,当一个消费组进程崩溃而没有干净关闭的情况下,新接管的消费进程可能会得到一些重复的消息,这些消息
在最后一次偏移成功提交给zookeeper之后。
如果一个应用程序关心重复的问题,那么它必须添加自己的去重复逻辑,要么使用我们返回给消费者的偏移量,要
么使用消息中的一些唯一密钥。这通常是一种比使用两阶段提交更经济的方法。
Kafka保证来自单个分区的消息按顺序传递给消费者。
然而,对于来自不同分区的消息的顺序,Kafka并不保证。
为了避免日志损坏,Kafka在日志中为每个消息存储一个CRC。
如果Broker上有任何I/O错误,Kafka会运行一个恢复过程来删除那些具有不一致CRC的消息。
在消息级别拥有CRC也允许我们在消息产生或消费后检查网络错误。
如果一个Broker宕机,那么存储在其上的任何未被消费的信息都将不可用。
如果一个Broker上的存储系统被永久损坏,任何未被消费的消息都会永远丢失。
在未来,我们计划在Kafka中添加复制功能,以便在多个Broker上冗余存储每一条消息。
4. Kafka在LinkedIn的实践
在本节中,我们将介绍我们如何在LinkedIn使用Kafka。
图3显示了我们部署的简化版本。
在每个运行面向用户服务的数据中心,我们都会部署一个Kafka集群。
前端服务会生成各种日志数据,并分批发布到本地的Kafka的Broker中。
我们依靠硬件负载均衡器将发布请求均匀地分配给Kafka的Broker。
Kafka的在线消费者在同一数据中心内的服务中运行。
我们还在每个数据中心单独部署了一个Kafka集群,用于离线分析,该集群在地理位置上靠近我们的Hadoop集群
和其他数据仓库基础设施。
这个Kafka实例运行一组嵌入式消费者,实时从数据中心的Kafka实例中拉取数据。
然后,我们运行数据加载任务,将数据从这个Kafka的复制集群拉到Hadoop和我们的数据仓库中,在这里我们运
行各种报表作业和数据分析处理。
我们还使用这个Kafka集群进行原型开发,并有能力针对原始事件流运行简单的脚本进行实时查询。
无需过多的调整,整个管道的端到端延迟平均约为10秒,足以满足我们的要求。
目前,Kafka每天积累了数百G字节的数据和近10亿条消息。
随着我们完成对遗留系统的迁移,我们预计这个数字将大幅增长。
未来还会增加更多类型的消息。
当运营人员启动或停止Broker进行软件或硬件维护时,再平衡过程能够自动重定向消费。
我们的跟踪系统还包括一个审计系统,以验证整个管道中的数据没有丢失。
为了方便起见,每条消息都带有时间戳和服务器名称。
我们对每个生产者进行仪器化处理,使其定期生成一个监控事件,记录该生产者在固定时间窗口内为每个主题发布
的消息数量。
生产者将监控事件发布到Kafka的一个单独的主题中。
然后,消费者可以统计他们从一个给定的主题中收到的消息数量,并将这些计数与监测事件进行验证,以验证数据
的正确性。
加载到Hadoop集群中是通过实现一种特殊的Kafka输入格式来完成的,该格式允许MapReduce作业直接从Kafka
中读取数据。
MapReduce作业加载原始数据,然后将其分组和压缩,以便将来进行高效处理。
无状态的Broker和客户端存储消息偏移在这里再次发挥了作用,使得MapReduce任务管理(允许任务失败和重
启)以自然的方式处理数据负载,而不会在任务重启时重复或丢失消息。
只有在任务成功完成后,数据和偏移量才会存储在HDFS中。
我们选择使用Avro作为我们的序列化协议,因为它是高效的,并且支持模式演化。
对于每条消息,我们将其Avro模式的id和序列化的字节存储在有效payload中。
这个模式允许我们执行一个约定,以确保数据生产者和消费者之间的兼容性。
我们使用一个轻量级的模式注册服务来将模式id映射到实际的模式。
当消费者得到一个消息时,它在模式注册表中查找,以检索该模式,该模式被用来将字节解码成对象(这种查找只
需要对每个模式进行一次,因为值是不可更改的)。
5. 实验结果
我们进行了一项实验性研究,将Kafka与Apache ActiveMQ v5.4(一种流行的JMS开源实现)和以性能著称的消息
系统RabbitMQ v2.4进行了比较。
我们使用了ActiveMQ的默认持久化消息存储KahaDB。
虽然这里没有介绍,但我们也测试了另一种AMQ消息存储,发现其性能与KahahaDB非常相似。
只要有可能,我们尽量在所有系统中使用可比性设置。
我们在2台Linux机器上进行了实验,每台机器都有8个2GHz核心,16GB内存,6个磁盘,带RAID 10。
这两台机器用1Gb网络链路连接。其中一台机器作为Broker,另一台机器作为生产者或消费者。
Producer测试:
我们将所有系统中的Broker配置为异步刷新消息到其持久化磁盘中。
对于每个系统,我们运行了一个单一的生产者来发布总共1000万条消息,每条消息的大小为200字节。
我们将Kafka生产者配置为以1和50的大小分批发送消息。
ActiveMQ和RabbitMQ似乎没有一个简单的消息批处理方法,我们假设它使用的是1的批处理大小,结果如图4所示。
x轴代表的是随着时间的推移向Broker发送的数据量,单位为MB,y轴对应的是生产者吞吐量,单位为每秒的消息量。
平均而言,Kafka在批处理大小为1和50的情况下,Kafka可以以每秒5万条和40万条消息的速度分别发布消息。
这些数字比ActiveMQ高了好几个数量级,而且至少是比RabbitMQ高2倍。
Kafka的表现要好得多有几个原因。
首先,Kafka生产者目前不等待Broker的回执,以Broker能处理的速度发送消息。
这大大增加了发布者的吞吐量。
在批处理量为50个的情况下,单个Kafka生产者几乎打满了生产者和Broker之间的1Gb带宽。
这对于日志聚合的情况来说是一个有效的优化,因为数据必须异步发送,以避免在实时服务流量中引入任何延迟。
同时我们注意到,broker在没有回送ack的情况下,不能保证producer每一条发布的消息都能被broker实际接收到。
对于不同类型的日志数据,只要丢掉的消息数量相对较少,以持久化换取吞吐量是可取的。然而,我们确实计划在
未来解决更多关键数据的持久化问题。
其次,Kafka使用有更有效的存储格式。
正常来说,在Kafka中,每个消息的开销是9个字节,而在ActiveMQ中则是144个字节。
这意味着ActiveMQ比Kafka多用了70%的空间来存储同样的1000万条消息。
ActiveMQ的一个开销来自于JMS所要求的沉重的消息头。
另一个开销是维护各种索引结构的成本。
我们观察到,ActiveMQ中最繁忙的线程之一花了大部分时间访问B-Tree来维护消息元数据和状态。
最后,批处理通过摊销RPC开销,大大提高了吞吐量。在Kafka中,50条消息的批处理量几乎提高了一个数量级的
吞吐量。
消费者测试:
在第二个实验中,我们测试了消费者的性能。
同样,对于所有系统,我们使用一个消费者来检索总共1000万条消息。
我们对所有系统进行了配置,使每个拉取请求预取的数据量大致相同–最多1000条消息或约200KB。
对于 ActiveMQ 和 RabbitMQ,我们将消费者确认模式设置为自动。
由于所有的消息都适合在内存中,所以所有的系统都是从底层文件系统的页面缓存或一些内存中的缓冲区中提供数
据。
结果如图5所示。
Kafka平均每秒消费22000条消息,是ActiveMQ和RabbitMQ的4倍多。
我们可以想到几个原因。
首先,由于Kafka有更有效的存储格式,所以消费者从Broker那里传输的字节数更少。
其次,ActiveMQ和RabbitMQ中的Broker都必须维护每一条消息的传递状态。
我们观察到ActiveMQ线程中的一个ActiveMQ线程在这个测试中忙于向磁盘写入KahaDB页面。
相比之下,Kafka代理上没有任何磁盘写入活动。
最后,通过使用sendfile API,Kafka降低了传输开销。
在这一节的最后,我们要指出,实验的目的并不是为了表明其他的消息传递系统不如Kafka。
毕竟,ActiveMQ和RabbitMQ都有比Kafka更多的功能。
主要是为了说明一个定制的系统可能带来的性能提升。
6. 总结与未来展望
我们提出了一个名为Kafka的新型系统,用于处理海量的日志数据流。
与普通消息传递系统一样,Kafka采用了一种基于拉取的消费模型,允许应用程序以自己的速度消费数据,并在需
要的时候随时倒带消费。
通过专注于日志处理应用,Kafka实现了比传统消息系统更高的吞吐量。
同时,它还提供了内置的分布式支持,并且可以进行扩展。我们已经在LinkedIn成功地将Kafka用于离线和在线应
用。
未来,我们有几个方向。
首先,我们计划在多个Broker之间添加内置的消息复制功能,即使在机器故障无法恢复的情况下,我们也可以提
供持久化和数据可用性保证。
我们希望同时支持异步和同步复制模型,以允许在生产者延迟和所提供的保证强度之间进行一些权衡。
一个应用可以根据自己对持久化、可用性和吞吐量的要求,选择合适的冗余级别。
其次,我们希望在Kafka中加入一些流处理能力。
在从Kafka中检索消息后,实时应用经常会执行类似的操作,例如基于窗口的计数,并将每条消息与二级存储中的
记录或与另一个流中的消息连接起来。
在最底层,在发布过程中,通过在join键上对消息进行语义上的分区来支持这种操作,这样,所有用特定键发送的
消息都会进入同一个分区,从而到达一个单一的消费进程。
这为在消费机集群中处理分布式流提供了基础。
在此基础上,我们觉得一个有用的信息流实用程序库,如不同的窗口化函数或连接技术将对这类应用有利。
7. 引用
- http://activemq.apache.org/
- http://avro.apache.org/
- Cloudera’s Flume, https://github.com/cloudera/flume
- http://developer.yahoo.com/blogs/hadoop/posts/2010/06/ena bling_hadoop_batch_processi_1/
- Efficient data transfer through zero copy: https://www.ibm.com/developerworks/linux/library/j- zerocopy/
- Facebook’s Scribe, http://www.facebook.com/note.php?note_id=32008268919
- IBM Websphere MQ: http://www- 01.ibm.com/software/integration/wmq/
- http://hadoop.apache.org/
- http://hadoop.apache.org/hdfs/
- http://hadoop.apache.org/zookeeper/
- http://www.slideshare.net/cloudera/hw09-hadoop-based- data-mining-platform-for-the-telecom-industry
- http://www.slideshare.net/prasadc/hive-percona-2009
- https://issues.apache.org/jira/browse/ZOOKEEPER-775
- JAVA Message Service: http://download.oracle.com/javaee/1.3/jms/tutorial/1_3_1- fcs/doc/jms_tutorialTOC.html.
- Oracle Enterprise Messaging Service: http://www.oracle.com/technetwork/middleware/ias/index- 093455.html
- http://www.rabbitmq.com/
- TIBCO Enterprise Message Service: http://www.tibco.com/products/soa/messaging/
- Kafka, http://sna-projects.com/kafka/